package com.atguigu.gmall.realtime.function;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.util.DimUtil;
import com.atguigu.gmall.realtime.util.JdbcUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;

import java.sql.Connection;

/**
 * @Author lzc
 * @Date 2023/2/21 08:45
 */
public abstract class DimFunction<T> extends RichMapFunction<T, T> {
    
    private Connection conn;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        conn = JdbcUtil.getPhoenixConnection();
    }
    
    @Override
    public void close() throws Exception {
        JdbcUtil.closeConnection(conn);
        
    }
    
    public abstract String getTable();
    
    public abstract Object getId(T value);
    
    public abstract void addDim(T value, JSONObject dim);
    
    @Override
    public T map(T value) throws Exception {
        // 1. 从 phoenix 读取 dim 数据
       JSONObject dim = DimUtil.readDimFromPhoenix(conn, getTable(), getId(value));
       
       // 2. 给 value 中的维度字段赋值
       addDim(value, dim);
       
    
        return value;
    }
}
